Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MarkDistinct only in limited cases #15927

Conversation

lukasz-stec
Copy link
Member

@lukasz-stec lukasz-stec commented Feb 1, 2023

Description

MarkDistinct is beneficial only when there is
a limited parallelism in query (e.g. global distinct aggregation is handled by a single thread).
For cases where there is enough parallelism, MarkDistinct hurts performance due to excessive data shuffle
and introducing multiple stages in case of
multiple distinct aggregations.

This PR also changes the type of use_mark_distinct session property and optimizer.use-mark-distinct config property from Boolean to Varchar. The new property still accepts true and false values to keep backwards compatibility but those have to be specified as Varchars (e.g. single quotes in the cli)

Additional context and related issues

To test different possible approaches I used queries that use different count of distinct values in a group by and 1 or 2 distinct aggregations + one non distinct aggregation.
I used three different cardinalities:
small: 502 unique values
medium: 10K unique values
high 240M unique values

The tests show that for small NDV mark distinct works better and for high NDV distinct aggregation is better.
In the medium case, optimize_mixed_distinct_aggregations is better for the duration at a significant CPU cost.

Queries
small cardinality, one distinct aggr

SELECT ss_store_sk, Sum(ss_quantity), Count(DISTINCT ss_customer_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY ss_store_sk;

small cardinality, 2 distinct aggrs

SELECT ss_store_sk, Sum(ss_quantity), Count(DISTINCT ss_customer_sk), Count(DISTINCT ss_promo_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY ss_store_sk;

 medium cardinality, 1 distinct aggr

SELECT ss_ticket_number % 10000, Sum(ss_quantity), Count(DISTINCT ss_customer_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY 1;

medium cardinality, 2 distinct aggrs

SELECT ss_ticket_number % 10000, Sum(ss_quantity), Count(DISTINCT ss_customer_sk), Count(DISTINCT ss_promo_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY 1;

big cardinality, 1 distinct aggr

SELECT ss_ticket_number, Sum(ss_quantity), Count(DISTINCT ss_customer_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY ss_ticket_number;

big cardinality, 2 distinct aggrs

SELECT ss_ticket_number, Sum(ss_quantity), Count(DISTINCT ss_customer_sk), Count(DISTINCT ss_promo_sk) FROM hive.tpcds_sf1000_orc.store_sales GROUP BY ss_ticket_number;
partitioning aggregation mode cardinality # distinct aggregations duration CPU
unpart optimize_mixed_distinct_aggregations small 1 10.41 1661
unpart MarkDistinct small 1 13.27 1176
unpart distinct aggregation small 1 53.58 1193
unpart MarkDistinct small 2 32.37 2813
unpart distinct aggregation small 2 1:15 1902
unpart optimize_mixed_distinct_aggregations large 1 19.77 2343
unpart MarkDistinct large 1 20.52 1446
unpart distinct aggregation large 1 18.32 1315
unpart MarkDistinct large 2 1:07 6591
unpart distinct aggregation large 2 35.63 3609
unpart optimize_mixed_distinct_aggregations medium 1 11.14 1637
unpart MarkDistinct medium 1 14.66 1219
unpart distinct aggregation medium 1 14.06 1190
unpart MarkDistinct medium 2 36.71 3641
unpart distinct aggregation medium 2 20.62 2446
part optimize_mixed_distinct_aggregations small 1 11.57 1721
part MarkDistinct small 1 15.12 1175
part distinct aggregation small 1 55.74 1188
part MarkDistinct small 2 34.6 2837
part distinct aggregation small 2 1:14 1863
part optimize_mixed_distinct_aggregations large 1 24.1 2619
part MarkDistinct large 1 24.91 1561
part distinct aggregation large 1 18.77 1349
part MarkDistinct large 2 1:08 6701
part distinct aggregation large 2 35.27 3623
part optimize_mixed_distinct_aggregations medium 1 13.07 1820
part MarkDistinct medium 1 14.81 1263
part distinct aggregation medium 1 14.15 1216
part MarkDistinct medium 2 36.5 3574
part distinct aggregation medium 2 19.67 2461

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# General
* Improve performance of queries with distinct aggregations using table statistics.
The choice of whether `MarkDistinct` operator is used for distinct aggregations is now configurable using a new configuration property `optimizer.mark-distinct-strategy` with the values `NONE`, `ALWAYS` and `AUTOMATIC`, with
`AUTOMATIC` as the default.
The `optimizer.use-mark-distinct` configuration property is deprecated in favour of the new property. If used, it's values are mapped to the `optimizer.mark-distinct-strategy` property as `true` -> `AUTOMATIC`, `false` -> `NONE`.

@cla-bot cla-bot bot added the cla-signed label Feb 1, 2023
@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch 3 times, most recently from 8a1a763 to e113af3 Compare February 3, 2023 09:26
@lukasz-stec lukasz-stec marked this pull request as ready for review February 3, 2023 09:27
@lukasz-stec lukasz-stec requested review from Dith3r and sopel39 February 3, 2023 09:28
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments. I think the approach is fine (although it might hurt from NDV misestimation, but it's life). Please run benchmarks. I'm curious if we will see some gains

}
int numberOfThreadsInACluster = taskCountEstimator.estimateSourceDistributedTaskCount(context.getSession()) * getTaskConcurrency(context.getSession());

if (numberOfDistinctValues <= MARK_DISTINCT_MAX_OUTPUT_ROW_COUNT_MULTIPLIER * numberOfThreadsInACluster) {
Copy link
Member

@sopel39 sopel39 Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One disadvantage of not using MarkDistinct is that without MarkDistinct partial aggregations are not possible. Maybe it's not a problem, but I could imagine it might be if data is already pre-partitioned as MarkDistinct requires

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you think of a case where this can happen (query)? I could then test it and add this case to the rule.
Can partial aggregation be pushed through MarkDistinct? if not, the biggest advantage of partial aggregation goes away as the data is shuffled across the network anyway, right?

import static io.trino.sql.planner.iterative.rule.test.PlanBuilder.expression;
import static io.trino.sql.planner.plan.AggregationNode.Step.SINGLE;

public class TestMultipleDistinctAggregationToMarkDistinct
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could also disable MarkDistinct when number of stages would exceed limit, but this can be another PR and it's not that simple...

@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from e113af3 to b3bb96d Compare February 3, 2023 15:47
@lukasz-stec lukasz-stec requested a review from sopel39 February 3, 2023 15:52
@lukasz-stec
Copy link
Member Author

tpch/tpcds orc part sf1k benchamrks look good (4 to 5 % CPU + similar fo duration) but this may be variability as queries that don't have distinct aggregation also speed up (e.g. tpch q17, 18, q21 ).

image

oss-orc-part-sf1K-020223.pdf

@sopel39
Copy link
Member

sopel39 commented Feb 6, 2023

tpch/tpcds orc part sf1k benchamrks look good (4 to 5 % CPU + similar fo duration) but this may be variability as queries that don't have distinct aggregation also speed up (e.g. tpch q17, 18, q21 ).

The network didn't go down so I'm not sure that rule triggered

@lukasz-stec
Copy link
Member Author

tpch/tpcds orc part sf1k benchamrks look good (4 to 5 % CPU + similar fo duration) but this may be variability as queries that don't have distinct aggregation also speed up (e.g. tpch q17, 18, q21 ).

The network didn't go down so I'm not sure that rule triggered

@sopel39 I checked, there are no tpch/tpcds queries that would want to use distinct aggregation. The rule triggers for some queries that have global aggregation but uses MarkDistinct for those as expected.

@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from b3bb96d to 2f8f3dc Compare February 13, 2023 11:41
@github-actions github-actions bot added the docs label Feb 13, 2023
@lukasz-stec
Copy link
Member Author

Following the discussion, I changed the approach to use_mark_distinct property. Instead of changing its type to varchar, which makes it backward incompatible, I added a new property mark_distinct_strategy that overrides the use_mark_distinct value if both are set.
Another open issue, raised by @raunaqmorarka here #15927 (comment), is: can we rely on NDV stats to disable MarkDistinct? There is a tradeoff here that in most cases this should work fine or at least not make it worse, except when we overestimate small NDV (e.g. we estimate tens of thousands instead of tens of distinct values). This can happen with multiple, strongly correlated grouping keys where one has low cardinality.
cc @sopel39 @raunaqmorarka

@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from 2f8f3dc to 2afa052 Compare February 28, 2023 10:36
Copy link
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from 2afa052 to bd83187 Compare February 28, 2023 11:38
@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from bd83187 to 021a4a0 Compare March 1, 2023 07:36
MarkDistinct is beneficial only when there is
a limited parallelism in query (e.g. global distinct
aggregation is handled by a single thread).
For cases where there is enough parallelism, MarkDistinct
hurts performance due to excessive data shuffle
and introducing multiple stages in case of
multiple distinct aggregations.
@lukasz-stec lukasz-stec force-pushed the ls/063-use-distinct-aggregation-by-default branch from 021a4a0 to e245caf Compare March 1, 2023 15:29
@lukasz-stec
Copy link
Member Author

TestOptimizerConfig fixed + rebased on a master (there were ci failures that may have been associated with the old master)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants